Assignment 03

Author
Affiliation

Tracy Anyasi

Boston University

Published

November 21, 2024

Modified

September 23, 2025

1 Load the dataset

import pandas as pd
import plotly.express as px
import plotly.io as pio
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id

np.random.seed(51)

pio.renderers.default = "notebook"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("data/lightcast_job_postings.csv")
df.createOrReplaceTempView("job_postings")

# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")

#df.printSchema() # comment this line when rendering the submission
#df.show(5)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/23 01:28:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:>                                                          (0 + 1) / 1]                                                                                25/09/23 01:28:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
## Casting Salaries
df = df.withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
        .withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
        .withColumn("SALARY", col("SALARY").cast("float")) \
        .withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float")) \
        .withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float")) \
        .withColumn("EDUCATION_LEVELS_NAME",regexp_replace(col("EDUCATION_LEVELS_NAME"), r"[\n\r]", "")) \

## Computing Medians
def compute_median(sdf, col_name):
    q = sdf.approxQuantile(col_name, [0.5], 0.01)
    return q[0] if q else None

median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")

print("Medians:", median_from, median_to, median_salary)

## Imput missing using the medians
df = df.fillna({
    "SALARY_FROM": median_from,
    "SALARY_TO": median_to,
    "SALARY": median_salary
})

## compute average salary
df = df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO"))/2)

## removing null values in Employmet type column
df = df.na.drop(subset=["EMPLOYMENT_TYPE_NAME"])

# df.select("Average_Salary", "SALARY", "EDUCATION_LEVELS_NAME", "REMOTE_TYPE_NAME", "MAX_YEARS_EXPERIENCE", "LOT_V6_SPECIALIZED_OCCUPATION_NAME").show(5, truncate=False)


## selecting required columns & exporting data/ saving to csv
export_cols = [
  "EDUCATION_LEVELS_NAME",
  "REMOTE_TYPE_NAME",
  "MAX_YEARS_EXPERIENCE",
  "Average_Salary",
  "SALARY_TO",
  "SALARY_FROM",
  "SALARY",
  "LOT_V6_SPECIALIZED_OCCUPATION_NAME",
  "LOT_OCCUPATION_NAME",
  "NAICS2_NAME",
  "EMPLOYMENT_TYPE_NAME",
  "MIN_YEARS_EXPERIENCE"

]

df_selected = df.select(*export_cols)

## export
pdf = df_selected.toPandas()
pdf.to_csv("lightcast_cleaned.csv", index=False)

#removing random characters from these columns
pdf["EMPLOYMENT_TYPE_NAME"] = pdf["EMPLOYMENT_TYPE_NAME"].astype(str).apply(
    lambda x: re.sub(r"[^\x00-\x7F]+", "", x)
)
pdf["EDUCATION_LEVELS_NAME"] = pdf["EDUCATION_LEVELS_NAME"].astype(str).str.replace(r"[\n\r\\\"\[\]]", "", regex=True)
print(pdf.columns.tolist())

print("Data cleaning complete. Row retained:", len(pdf))
[Stage 2:>                                                          (0 + 1) / 1]                                                                                [Stage 3:>                                                          (0 + 1) / 1]                                                                                [Stage 4:>                                                          (0 + 1) / 1]                                                                                
Medians: 87295.0 130042.0 115024.0
[Stage 5:>                                                          (0 + 1) / 1]                                                                                
['EDUCATION_LEVELS_NAME', 'REMOTE_TYPE_NAME', 'MAX_YEARS_EXPERIENCE', 'Average_Salary', 'SALARY_TO', 'SALARY_FROM', 'SALARY', 'LOT_V6_SPECIALIZED_OCCUPATION_NAME', 'LOT_OCCUPATION_NAME', 'NAICS2_NAME', 'EMPLOYMENT_TYPE_NAME', 'MIN_YEARS_EXPERIENCE']
Data cleaning complete. Row retained: 72454

#Question 1a - Salary Distribution by Industry

fig = px.box(
  pdf,
  x="NAICS2_NAME",
  y="SALARY",
  title="Salary Distribution by Industry",
  color_discrete_sequence=["purple"],
  points="outliers",
)

fig.update_layout(
  font_family="Times New Roman",
  title_font_size=16,
  xaxis_title="Industry",
  yaxis_title="Salary",
  xaxis_tickangle=45,
)

fig.show()
fig.write_html("Q1a.html")
#fig.write_image("Q1a.png")

#Analysis:

#Question 1b - Salary Distribution by Employment Type

fig = px.box(
  pdf,
  x="EMPLOYMENT_TYPE_NAME",
  y="SALARY",
  title="Salary Distribution by Employment Type",
  color_discrete_sequence=["orange"],
  points="outliers",
)

fig.update_layout(
  font_family="Times New Roman",
  title_font_size=16,
  xaxis_title="Employment Type",
  yaxis_title="Salary",
  xaxis_tickangle=45,
)

fig.show()
fig.write_html("Q1b.html")
#fig.write_image("Q1b.png")

#Analysis:

#Question 2 - Salary Analysis by ONET Occupation Type

#aggregate data by median salary for each occupation (saonet)

saonet = pdf.groupby("LOT_OCCUPATION_NAME").agg(
    median_salary=("SALARY", "median"),  # median salary
    job_count=("SALARY", "count")        # number of postings
).reset_index()

fig = px.scatter(
  saonet,
  x="LOT_OCCUPATION_NAME",
  y="median_salary",
  size="job_count",
  size_max=60,
  color_continuous_scale=["plasma"],
  title="Salary Analysis by Occupation"
)

fig.update_layout(
  font_family="Times New Roman",
  title_font_size=16,
  xaxis_title="Occupation",
  yaxis_title="Median Salary",
  xaxis_tickangle=45,
)

fig.show()
fig.write_html("Q2.html")
#fig.write_image("Q2.png")